{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "6f938540",
   "metadata": {},
   "source": [
    "# An end-to-end Vertex Training Pipeline Demonstration"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "62f7de01",
   "metadata": {},
   "source": [
    "Finally, check that you have correctly installed the packages. The KFP SDK version should be >=1.6:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "9222df33",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "KFP SDK version: 1.8.2\r\n"
     ]
    }
   ],
   "source": [
    "!python3 -c \"import kfp; print('KFP SDK version: {}'.format(kfp.__version__))\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "164ff1c9",
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "import json\n",
    "from functools import partial\n",
    "\n",
    "import kfp\n",
    "import pprint\n",
    "import yaml\n",
    "from jinja2 import Template\n",
    "from kfp.v2 import dsl\n",
    "from kfp.v2.compiler import compiler\n",
    "from kfp.v2.dsl import Dataset\n",
    "from kfp.v2.google.client import AIPlatformClient"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "id": "dbe612cb",
   "metadata": {},
   "outputs": [],
   "source": [
    "project_id='woven-rush-197905'\n",
    "project_number='297370817971'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "id": "27224ab9",
   "metadata": {},
   "outputs": [],
   "source": [
    "af_registry_location='asia-southeast1'\n",
    "af_registry_name='mlops-vertex-kit'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "id": "cfe4d02b",
   "metadata": {},
   "outputs": [],
   "source": [
    "components_dir='../components/'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "id": "82030efb",
   "metadata": {
    "jupyter": {
     "outputs_hidden": false
    },
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "def _load_custom_component(project_id: str,\n",
    "                           af_registry_location: str,\n",
    "                           af_registry_name: str,\n",
    "                           components_dir: str,\n",
    "                           component_name: str):\n",
    "  component_path = os.path.join(components_dir,\n",
    "                                component_name,\n",
    "                                'component.yaml.jinja')\n",
    "  with open(component_path, 'r') as f:\n",
    "    component_text = Template(f.read()).render(\n",
    "      project_id=project_id,\n",
    "      af_registry_location=af_registry_location,\n",
    "      af_registry_name=af_registry_name)\n",
    "\n",
    "  return kfp.components.load_component_from_text(component_text)\n",
    "\n",
    "load_custom_component = partial(_load_custom_component,\n",
    "                                project_id=project_id,\n",
    "                                af_registry_location=af_registry_location,\n",
    "                                af_registry_name=af_registry_name,\n",
    "                                components_dir=components_dir)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "id": "495502ee",
   "metadata": {
    "jupyter": {
     "outputs_hidden": false
    },
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "preprocess_op = load_custom_component(component_name='data_preprocess')\n",
    "train_op = load_custom_component(component_name='train_model')\n",
    "check_metrics_op = load_custom_component(component_name='check_model_metrics')\n",
    "create_endpoint_op = load_custom_component(component_name='create_endpoint')\n",
    "test_endpoint_op = load_custom_component(component_name='test_endpoint')\n",
    "deploy_model_op = load_custom_component(component_name='deploy_model')\n",
    "monitor_model_op = load_custom_component(component_name='monitor_model')"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "0026cb75",
   "metadata": {},
   "source": [
    "Then define the pipeline using the following function:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "id": "8d36a16a",
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline_region='asia-southeast1'\n",
    "pipeline_root='gs://vertex_pipeline_demo_root/pipeline_root'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "id": "954a7af2",
   "metadata": {},
   "outputs": [],
   "source": [
    "data_region='asia-southeast1'\n",
    "input_dataset_uri='bq://woven-rush-197905.vertex_pipeline_demo.banknote_authentication'\n",
    "gcs_data_output_folder='gs://vertex_pipeline_demo_root/datasets/training'\n",
    "training_data_schema='VWT:float;SWT:float;KWT:float;Entropy:float;Class:int'\n",
    "\n",
    "data_pipeline_root='gs://vertex_pipeline_demo_root/compute_root'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "id": "e6812d61",
   "metadata": {},
   "outputs": [],
   "source": [
    "training_container_image_uri=f'{af_registry_location}-docker.pkg.dev/{project_id}/{af_registry_name}/training:latest'\n",
    "serving_container_image_uri=f'{af_registry_location}-docker.pkg.dev/{project_id}/{af_registry_name}/serving:latest'\n",
    "custom_job_service_account=f'{project_number}-compute@developer.gserviceaccount.com'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "id": "4574529c",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "('asia-southeast1-docker.pkg.dev/woven-rush-197905/mlops-vertex-kit/training:latest',\n",
       " 'asia-southeast1-docker.pkg.dev/woven-rush-197905/mlops-vertex-kit/serving:latest',\n",
       " '297370817971-compute@developer.gserviceaccount.com')"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "training_container_image_uri,serving_container_image_uri,custom_job_service_account"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "id": "7eb569d8",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'{\"num_leaves_hp_param_min\": 6, \"num_leaves_hp_param_max\": 11, \"max_depth_hp_param_min\": -1, \"max_depth_hp_param_max\": 4, \"num_boost_round\": 300, \"min_data_in_leaf\": 5}'"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "train_additional_args = json.dumps({\n",
    "    'num_leaves_hp_param_min': 6,\n",
    "    'num_leaves_hp_param_max': 11,\n",
    "    'max_depth_hp_param_min': -1,\n",
    "    'max_depth_hp_param_max': 4,\n",
    "    'num_boost_round': 300,\n",
    "    'min_data_in_leaf': 5\n",
    "})\n",
    "train_additional_args"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "id": "a36e0154",
   "metadata": {},
   "outputs": [],
   "source": [
    "@dsl.pipeline(name='training-pipeline-template')\n",
    "def pipeline(project_id: str,\n",
    "             data_region: str,\n",
    "             gcs_data_output_folder: str,\n",
    "             input_dataset_uri: str,\n",
    "             training_data_schema: str,\n",
    "             data_pipeline_root: str,\n",
    "             \n",
    "             training_container_image_uri: str,\n",
    "             train_additional_args: str,\n",
    "             serving_container_image_uri: str,\n",
    "             custom_job_service_account: str,\n",
    "             hptune_region: str,\n",
    "             hp_config_suggestions_per_request: int,\n",
    "             hp_config_max_trials: int,\n",
    "             \n",
    "             metrics_name: str,\n",
    "             metrics_threshold: float,\n",
    "             \n",
    "             endpoint_machine_type: str,\n",
    "             endpoint_min_replica_count: int,\n",
    "             endpoint_max_replica_count: int,\n",
    "             endpoint_test_instances: str,\n",
    "             \n",
    "             monitoring_user_emails: str,\n",
    "             monitoring_log_sample_rate: float,\n",
    "             monitor_interval: int,\n",
    "             monitoring_default_threshold: float,\n",
    "             monitoring_custom_skew_thresholds: str,\n",
    "             monitoring_custom_drift_thresholds: str,\n",
    "             \n",
    "             machine_type: str = \"n1-standard-8\",\n",
    "             accelerator_count: int = 0,\n",
    "             accelerator_type: str = 'ACCELERATOR_TYPE_UNSPECIFIED',\n",
    "             vpc_network: str = \"\",\n",
    "             enable_model_monitoring: str = 'False'):\n",
    "\n",
    "    dataset_importer = kfp.v2.dsl.importer(\n",
    "      artifact_uri=input_dataset_uri,\n",
    "      artifact_class=Dataset,\n",
    "      reimport=False)\n",
    "\n",
    "    preprocess_task = preprocess_op(\n",
    "      project_id=project_id,\n",
    "      data_region=data_region,\n",
    "      gcs_output_folder=gcs_data_output_folder,\n",
    "      gcs_output_format=\"CSV\",\n",
    "      input_dataset=dataset_importer.output)\n",
    "    \n",
    "    train_task = train_op(\n",
    "      project_id=project_id,\n",
    "      data_region=data_region,\n",
    "      data_pipeline_root=data_pipeline_root,\n",
    "      input_data_schema=training_data_schema,\n",
    "      training_container_image_uri=training_container_image_uri,\n",
    "      train_additional_args=train_additional_args,\n",
    "      serving_container_image_uri=serving_container_image_uri,\n",
    "      custom_job_service_account=custom_job_service_account,\n",
    "      input_dataset=preprocess_task.outputs['output_dataset'],\n",
    "      machine_type=machine_type,\n",
    "      accelerator_count=accelerator_count,\n",
    "      accelerator_type=accelerator_type,\n",
    "      hptune_region=hptune_region,\n",
    "      hp_config_max_trials=hp_config_max_trials,\n",
    "      hp_config_suggestions_per_request=hp_config_suggestions_per_request,\n",
    "      vpc_network=vpc_network)\n",
    "    \n",
    "    check_metrics_task = check_metrics_op(\n",
    "      metrics_name=metrics_name,\n",
    "      metrics_threshold=metrics_threshold,\n",
    "      basic_metrics=train_task.outputs['basic_metrics'])\n",
    "    \n",
    "    create_endpoint_task = create_endpoint_op(\n",
    "      project_id=project_id,\n",
    "      data_region=data_region,\n",
    "      data_pipeline_root=data_pipeline_root,\n",
    "      display_name='endpoint-classification-template',\n",
    "      create_if_not_exists=True)\n",
    "\n",
    "    deploy_model_task = deploy_model_op(\n",
    "      project_id=project_id,\n",
    "      data_region=data_region,\n",
    "      data_pipeline_root=data_pipeline_root,\n",
    "      machine_type=endpoint_machine_type,\n",
    "      min_replica_count=endpoint_min_replica_count,\n",
    "      max_replica_count=endpoint_max_replica_count,\n",
    "      model=train_task.outputs['output_model'],\n",
    "      endpoint=create_endpoint_task.outputs['endpoint'])\n",
    "    \n",
    "    test_endpoint_task = test_endpoint_op(\n",
    "      project_id=project_id,\n",
    "      data_region=data_region,\n",
    "      data_pipeline_root=data_pipeline_root,\n",
    "      endpoint=create_endpoint_task.outputs['endpoint'],\n",
    "      test_instances=endpoint_test_instances,\n",
    "    ).after(deploy_model_task)\n",
    "    \n",
    "    with dsl.Condition(enable_model_monitoring == 'True', name='Monitoring'):\n",
    "        monitor_model_task = monitor_model_op(\n",
    "          project_id=project_id,\n",
    "          data_region=data_region,\n",
    "          user_emails=monitoring_user_emails,\n",
    "          log_sample_rate=monitoring_log_sample_rate,\n",
    "          monitor_interval=monitor_interval,\n",
    "          default_threshold=monitoring_default_threshold,\n",
    "          custom_skew_thresholds=monitoring_custom_skew_thresholds,\n",
    "          custom_drift_thresholds=monitoring_custom_drift_thresholds,\n",
    "          endpoint=create_endpoint_task.outputs['endpoint'],\n",
    "          instance_schema=train_task.outputs['instance_schema'],\n",
    "          dataset=preprocess_task.outputs['output_dataset'])\n",
    "        monitor_model_task.after(deploy_model_task)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "ff5fba56",
   "metadata": {},
   "source": [
    "### Compile and run the end-to-end ML pipeline\n",
    "With our full pipeline defined, it's time to compile it:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "id": "794c18bb",
   "metadata": {},
   "outputs": [],
   "source": [
    "compiler.Compiler().compile(\n",
    "    pipeline_func=pipeline, \n",
    "    package_path=\"training_pipeline_job.json\"\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "575b575f",
   "metadata": {},
   "source": [
    "Next, instantiate an API client:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "id": "5ab349ab",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/Users/luoshixin/Develop/uob-mlops/venv/lib/python3.7/site-packages/kfp/v2/google/client/client.py:173: FutureWarning: AIPlatformClient will be deprecated in v1.9. Please use PipelineJob https://googleapis.dev/python/aiplatform/latest/_modules/google/cloud/aiplatform/pipeline_jobs.html in Vertex SDK. Install the SDK using \"pip install google-cloud-aiplatform\"\n",
      "  category=FutureWarning,\n"
     ]
    }
   ],
   "source": [
    "api_client = AIPlatformClient(\n",
    "    project_id=project_id,\n",
    "    region=pipeline_region)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "951f42d8",
   "metadata": {},
   "source": [
    "Next, kick off a pipeline run:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "id": "0423941b",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'[{\"VWT\": 3.6216, \"SWT\": 8.6661, \"KWT\": -2.8073, \"Entropy\": -0.44699, \"Class\": \"0\"}, {\"VWT\": 4.5459, \"SWT\": 8.1674, \"KWT\": -2.4586, \"Entropy\": -1.4621, \"Class\": \"0\"}, {\"VWT\": 3.866, \"SWT\": -2.6383, \"KWT\": 1.9242, \"Entropy\": 0.10645, \"Class\": \"0\"}, {\"VWT\": -3.7503, \"SWT\": -13.4586, \"KWT\": 17.5932, \"Entropy\": -2.7771, \"Class\": \"1\"}, {\"VWT\": -3.5637, \"SWT\": -8.3827, \"KWT\": 12.393, \"Entropy\": -1.2823, \"Class\": \"1\"}, {\"VWT\": -2.5419, \"SWT\": -0.65804, \"KWT\": 2.6842, \"Entropy\": 1.1952, \"Class\": \"1\"}]'"
      ]
     },
     "execution_count": 16,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "test_instances = json.dumps([\n",
    "\t\t{\"VWT\":3.6216,\"SWT\":8.6661,\"KWT\":-2.8073,\"Entropy\":-0.44699,\"Class\":\"0\"},\n",
    "\t\t{\"VWT\":4.5459,\"SWT\":8.1674,\"KWT\":-2.4586,\"Entropy\":-1.4621,\"Class\":\"0\"},\n",
    "\t\t{\"VWT\":3.866,\"SWT\":-2.6383,\"KWT\":1.9242,\"Entropy\":0.10645,\"Class\":\"0\"},\n",
    "\t\t{\"VWT\":-3.7503,\"SWT\":-13.4586,\"KWT\":17.5932,\"Entropy\":-2.7771,\"Class\":\"1\"},\n",
    "\t\t{\"VWT\":-3.5637,\"SWT\":-8.3827,\"KWT\":12.393,\"Entropy\":-1.2823,\"Class\":\"1\"},\n",
    "\t\t{\"VWT\":-2.5419,\"SWT\":-0.65804,\"KWT\":2.6842,\"Entropy\":1.1952,\"Class\":\"1\"}\n",
    "\t\t])\n",
    "test_instances"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "id": "5a3fe7dd",
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "See the Pipeline job <a href=\"https://console.cloud.google.com/vertex-ai/locations/asia-southeast1/pipelines/runs/training-pipeline-template-20211008145651?project=woven-rush-197905\" target=\"_blank\" >here</a>."
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    }
   ],
   "source": [
    "pipeline_params = {\n",
    "    'project_id': project_id,\n",
    "    'data_region': data_region,\n",
    "    'gcs_data_output_folder': gcs_data_output_folder,\n",
    "    'input_dataset_uri': input_dataset_uri,\n",
    "    'training_data_schema': training_data_schema,\n",
    "    'data_pipeline_root': data_pipeline_root,\n",
    "    \n",
    "    'training_container_image_uri': training_container_image_uri,\n",
    "    'train_additional_args': train_additional_args,\n",
    "    'serving_container_image_uri': serving_container_image_uri,\n",
    "    'custom_job_service_account': custom_job_service_account,\n",
    "    'hptune_region':\"asia-east1\",\n",
    "    'hp_config_suggestions_per_request': 5,\n",
    "    'hp_config_max_trials': 30,\n",
    "    \n",
    "    'metrics_name': 'au_prc',\n",
    "    'metrics_threshold': 0.4,\n",
    "    \n",
    "    'endpoint_machine_type': 'n1-standard-4',\n",
    "    'endpoint_min_replica_count': 1,\n",
    "    'endpoint_max_replica_count': 1,\n",
    "    'endpoint_test_instances': test_instances,\n",
    "    \n",
    "    'monitoring_user_emails': 'luoshixin@google.com',\n",
    "    'monitoring_log_sample_rate': 0.8,\n",
    "    'monitor_interval': 3600,\n",
    "    'monitoring_default_threshold': 0.3,\n",
    "    'monitoring_custom_skew_thresholds': 'VWT:.5,SWT:.2,KWT:.7,Entropy:.4',\n",
    "    'monitoring_custom_drift_thresholds': 'VWT:.5,SWT:.2,KWT:.7,Entropy:.4',\n",
    "    'enable_model_monitoring': 'True'\n",
    "}\n",
    "\n",
    "response = api_client.create_run_from_job_spec(\n",
    "    job_spec_path=\"training_pipeline_job.json\", \n",
    "    pipeline_root=pipeline_root,\n",
    "    parameter_values=pipeline_params,\n",
    "    enable_caching=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "109dd125",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "environment": {
   "name": "tf2-gpu.2-5.m74",
   "type": "gcloud",
   "uri": "gcr.io/deeplearning-platform-release/tf2-gpu.2-5:m74"
  },
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
